Foreword
- 阅读前需要了解
DelayQueue
的基本使用. 可以参考 Guide to DelayQueue | Baeldung
SimpleDelayQueue
DelayQueue
的基本思想很简单:
- 内部容器为一个优先队列, 优先条件为出队列时间. 即应该出队列的时间越早越优先.
- 通过
Condition#awaitNanos
来实现时间上的延迟. 举例, 假设队列中只有一个元素, 需要延迟1秒. 那么试图获取此元素的线程就需要await(1, TimeUnit.SECONDS)
DelayQueue
的成员非常少, 都列在下方. 其中的leader
可以暂时忽略.
1 | private transient final ReentrantLock lock = new ReentrantLock(); |
值得注意的一点是, 虽然DelayQueue
是BlockingQueue
的实现, 但是DelayQueue
的所有插入方法都不会阻塞, put
方法也只是调用了offer
而已:
Inserts the specified element into this delay queue. As the queue is unbounded this method will never block.
如果不考虑leader
, 可以仅仅基于lock
, awailable
,q
实现一个更加简单的延迟队列. 我们只实现两个最核心的方法:
1 | /** |
offer
offer
主体可以说是非常简单了:
- 向优先队列中插入元素
- 如果刚刚插入的元素为队列头部. 那么则对其他线程进行
signal
. 可以猜想在典型的生产线程/消费线程情况下, 没有拿到元素的线程应该是在await
状态, 所以signal
唤醒其中一个消费线程是必要的. 而如果元素不是队列的head, 那么通常表示此元素还不需要出队列, 没有必要白白唤醒一个线程. (TODO)
take
take
的逻辑可以简单看一眼. 如果感觉不清晰没关系, 后面的举例更明了. 也可以参考流程图:
举个栗子
假设实现一种延迟指定秒的Delay
, new FixedSecondsDelay(5)
表示延迟5秒. 如果对实现感兴趣可以看如下代码:
1 | /** |
调用代码如下:
1 | DelayQueue<FixedSecondsDelay> delayQueue = new DelayQueue<>(); |
整个过程中涉及到了三个线程, 分别为循环中创建的消费线程Thread0
, Thread1
和主线程Main
. 接下来对整个流程进行流水账说明:
1. `Thread0`获取锁之后, 马上进入了`await`, 而此操作释放了锁, 则`Thread1`进入, 同样进入`await`状态.
2. 接下来`Main`线程, 放入了一个两秒后可以出队列的元素, 此时只有一个元素, 故亦为队列头元素. 所以会`signal`一个等待线程, 假设为`Thread1`
3. 被唤醒的`Thread1` 进入了下次循环, 而此时头部元素不为空, 但是还没到时间, 仍需2秒才能出队列, 故`Thread1`开始`awaitNanos(delay)` (2秒) , 结束`awaitNanos`之后, 下次循环即可取出元素. 随机返回.
制造一个更加复杂一点的场景, delayQueue.put(new FixedSecondsDelay(2));
之后再追加一个delayQueue.put(new FixedSecondsDelay(5));
. 那么和上方, 第1, 2步是相同的. 第3步有一些差别:
3. 5秒出队列的元素, 由于不是队列头, 所以并没有去唤醒任一线程. 但是当`Thread1`结束之后, 会去`signal`等待的`Thread0`来继续处理此元素.
Leader - follower
第一印象
Thread designated to wait for the element at the head of the queue. This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to minimize unnecessary timed waiting. When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.
这是关于leader
的前半部分注释. 实际上”timed_waiting”以及”waits only for the next delay to elapse”的实际行为都是awaitNanos
. 相应的, “await indefinitely”即await
. 加深一下印象:
- leader表示等待处理队列head元素的线程
- 减少不必要的`awaitNanos`.
总体来讲, leader
的作用就是尽量让线程处于await
状态, 而不是await
. 可想而知这基于一个前提: awaitNanos
会带来不必要的性能消耗.
如何减少awaitNanos
对比DelayQueue
和SimpleDelayQueue
. 前者多了这些判断
offer
中如果为头元素, 会将leader
设置为null
take
中判断leader != null
, 会进入await
. 并且awaitNanos
前后分别会占用/放弃leader
位置.
当面临如下场景时:
1 | DelayQueue<Delayed> delayQueue = new DelayQueue<>(); |
假设两个线程为Thread0
, Thread1
. 其中一个先获取锁之后, 占用了leader
, 开始awaitNanos
. 而另一个线程判断当前leader
不为空, 则进入了await
状态. 相比之下SimpleDelayQueue
在这种场景中, 两个线程都在awaitNanos
.
Leader Summary
- 在处理队列head的线程才能是
leader
. 如果队列head更换, 那么对应线程的leader
位置就不保了. - 一个线程成为
leader
之后, 它awaitNanos
, 其他线程await
(这句话在上面的case适用, 但是不是所有场景都如此) leader
拿完元素走人之后, 要signal
其他线程(follower).